 1.大数据高速计算引擎Spark(上)之Spark SQL中Spark SQL编程下的SparkSession
   
   官方文档：http://spark.apache.org/docs/latest/sql-getting-started.html
   在 Spark 2.0 之前：
   SQLContext 是创建 DataFrame 和执行 SQL 的入口
   HiveContext通过Hive sql语句操作Hive数据，兼Hhive操作，HiveContext继承自
SQLContext
   在 Spark 2.0 之后：
   将这些入口点统一到了SparkSession，SparkSession 封装了SqlContext及HiveContext；
   实现了 SQLContext 及 HiveContext 所有功能；
   通过SparkSession可以获取到SparkConetxt；

import org.apache.spark.sql.SparkSession
val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

 2.DataFrame & Dataset 的创建
   
   不要刻意区分：DF、DS。DF是一种特殊的DS；ds.transformation => df
   1).由range生成Dataset

val numDS = spark.range(5, 100, 5)
// orderBy 转换操作；desc：function；show：Action
numDS.orderBy(desc("id")).show(5)

// 统计信息
numDS.describe().show

// 显示schema信息
numDS.printSchema

// 使用RDD执行同样的操作
numDS.rdd.map(_.toInt).stats

// 检查分区数
numDS.rdd.getNumPartitions
   2).由集合生成Dataset
   Dataset = RDD[case class]
   
case class Person(name:String, age:Int, height:Int)

// 注意 Seq 中元素的类型
val seq1 = Seq(Person("Jack", 28, 184), Person("Tom", 10, 144), Person("Andy", 16, 165))
val ds1 = spark.createDataset(seq1)
// 显示schema信息
ds1.printSchema
ds1.show

val seq2 = Seq(("Jack", 28, 184), ("Tom", 10, 144), ("Andy", 16, 165))
val ds2 = spark.createDataset(seq2)
ds2.show
   3).由集合生成DataFrame
   DataFrame = RDD[Row] + Schema

val lst = List(("Jack", 28, 184), ("Tom", 10, 144), ("Andy", 16, 165))
val df1 = spark.createDataFrame(lst).
// 改单个字段名时简便
withColumnRenamed("_1", "name1").
withColumnRenamed("_2", "age1").
withColumnRenamed("_3", "height1")
df1.orderBy("age1").show(10)

// desc是函数，在IDEA中使用是需要导包
import org.apache.spark.sql.functions._
df1.orderBy(desc("age1")).show(10)

// 修改整个DF的列名
val df2 = spark.createDataFrame(lst).toDF("name", "age", "height")
   4).RDD 转成 DataFrame
   DataFrame = RDD[Row] + Schema
   
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

val arr = Array(("Jack", 28, 184), ("Tom", 10, 144), ("Andy", 16, 165))
val rdd1 = sc.makeRDD(arr).map(f=>Row(f._1, f._2, f._3))
val schema = StructType( StructField("name", StringType,false) ::
 StructField("age", IntegerType, false) ::
 StructField("height", IntegerType, false) :: Nil)

val schema1 = (new StructType).
add("name", "string", false).
add("age", "int", false).
add("height", "int", false)

// RDD => DataFrame，要指明schema
val rddToDF = spark.createDataFrame(rdd1, schema1)
rddToDF.orderBy(desc("name")).show(false)

import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

val arr1 = Array(("Jack", 28, null), ("Tom", 10, 144),("Andy", 16, 165))
val rdd1 = sc.makeRDD(arr1).map(f=>Row(f._1, f._2, f._3))
val structType = StructType(StructField("name", StringType, false) ::
StructField("age", IntegerType, false) ::
StructField("height", IntegerType, false) :: Nil)

// false 说明字段不能为空
val schema1 = structType
val df1 = spark.createDataFrame(rdd1, schema1)

// 下一句执行报错(因为有空字段)
df1.show

// true 允许该字段为空，语句可以正常执行
val schema2 = StructType(StructField("name", StringType,false) ::
StructField("age", IntegerType, false) ::
StructField("height", IntegerType, true) :: Nil)

val df2 = spark.createDataFrame(rdd1, schema2)
df2.show

// IDEA中需要，spark-shell中不需要
package cn.lagou.sparksql

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._

case class Person(name:String, age:Int, height:Int)

object Demo1 {
  def main(args: Array[String]): Unit = {
val spark = SparkSession
  .builder()
  .appName("Demo1")
  .master("local[*]")
  .getOrCreate()
val sc = spark.sparkContext
sc.setLogLevel("warn")

import spark.implicits._
//val arr2 = Array(("Jack", 28, 150), ("Tom", 10, 144), ("Andy", 16, 165))
//val rddToDF: DataFrame = sc.makeRDD(arr2).toDF("name", "age", "height")
//rddToDF.orderBy("age").show(10)
//rddToDF.orderBy(desc("age")).show(10)

val arr2 = Array(("Jack", 28, 150), ("Tom", 10, 144), ("Andy", 16, 165))
val rdd2: RDD[Person] =
  spark.sparkContext.makeRDD(arr2).map(f=>Person(f._1, f._2, f._3))
val ds2 = rdd2.toDS() // 反射推断，spark 通过反射从case class的定义得到类名
val df2 = rdd2.toDF() // 反射推断
ds2.printSchema
df2.printSchema
ds2.orderBy(desc("name")).show(10)
df2.orderBy(desc("name")).show(10)

spark.close()
  }
}

   5).RDD转Dataset
   Dataset = RDD[case class]
   DataFrame = RDD[Row] + Schema

val ds3 = spark.createDataset(rdd2)
ds3.show(10)
   6).从文件创建DateFrame(以csv文件为例)

val df1: DataFrame = spark.read.csv("data/people1.csv")
df1.printSchema()
df1.show()

val df2: DataFrame = spark.read.csv("data/people2.csv")
df2.printSchema()
df2.show()

//定义参数
val df3: DataFrame = spark.read
  .options(Map(("header", "true"), ("inferschema", "true")))
  .csv("data/people1.csv")
df3.printSchema()
df3.show()

//Spark 2.3.0
val schemaStr = "name string, age int, job string"
val df4: DataFrame = spark.read
  .option("header", "true")
  .option("delimiter", ";")
  .schema(schemaStr)
  .csv("data/people2.csv")
df4.printSchema()
df4.show()
spark.close()

 3.三者的转换
   
   SparkSQL提供了一个领域特定语言(DSL)以方便操作结构化数据。核心思想还是
SQL；仅仅是一个语法的问题。